/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.secondaryindex.load;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.util.CarbonLoaderUtil;

import org.apache.log4j.Logger;
import org.apache.spark.sql.index.CarbonIndexUtil;
import org.apache.spark.sql.secondaryindex.command.ErrorMessage;

public class CarbonInternalLoaderUtil {

  private static final Logger LOGGER = LogServiceFactory.getLogService(
      CarbonInternalLoaderUtil.class.getName());

  public static List<String> getListOfValidSlices(LoadMetadataDetails[] details) {
    List<String> activeSlices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
    for (LoadMetadataDetails oneLoad : details) {
      // External added segments are not loaded to SI
      if (oneLoad.getPath() == null && SegmentStatus.SUCCESS.equals(oneLoad.getSegmentStatus())
          || SegmentStatus.LOAD_PARTIAL_SUCCESS.equals(oneLoad.getSegmentStatus())
          || SegmentStatus.MARKED_FOR_UPDATE.equals(oneLoad.getSegmentStatus())) {
        activeSlices.add(oneLoad.getLoadName());
      }
    }
    return activeSlices;
  }

  /**
   * This method will return the mapping of valid segments to segment load start time
   */
  public static Map<String, Long> getSegmentToLoadStartTimeMapping(LoadMetadataDetails[] details) {
    Map<String, Long> segmentToLoadStartTimeMap = new HashMap<>(details.length);
    for (LoadMetadataDetails oneLoad : details) {
      // valid segments will only have Success status
      if (SegmentStatus.SUCCESS.equals(oneLoad.getSegmentStatus())
          || SegmentStatus.LOAD_PARTIAL_SUCCESS.equals(oneLoad.getSegmentStatus())) {
        segmentToLoadStartTimeMap.put(oneLoad.getLoadName(), oneLoad.getLoadStartTime());
      }
    }
    return segmentToLoadStartTimeMap;
  }

  /**
   * This API will write the load level metadata for the load management module inorder to
   * manage the load and query execution management smoothly.
   *
   * @return boolean which determines whether status update is done or not.
   */
  public static boolean recordLoadMetadata(List<LoadMetadataDetails> newLoadMetadataDetails,
      List<String> validSegments, CarbonTable carbonTable, List<CarbonTable> indexCarbonTables,
      String databaseName, String tableName) {
    boolean status = false;
    String metaDataFilepath = carbonTable.getMetadataPath();
    AbsoluteTableIdentifier absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier();
    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
    try {
      int retryCount = CarbonLockUtil
          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
      int maxTimeout = CarbonLockUtil
          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
      if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
        LOGGER.info("Acquired lock for table" + databaseName + "." + tableName
            + " for table status update");

        if (isSegmentsAlreadyCompactedForNewMetaDataDetails(indexCarbonTables, tableName,
            newLoadMetadataDetails)) {
          return false;
        }

        LoadMetadataDetails[] currentLoadMetadataDetails = SegmentStatusManager.readLoadMetadata(
            metaDataFilepath);

        List<LoadMetadataDetails> updatedLoadMetadataDetails = new ArrayList<>(
            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);

        // check which load needs to be overwritten which are in in progress state
        boolean found = false;
        for (int i = 0; i < currentLoadMetadataDetails.length; i++) {
          for (LoadMetadataDetails newLoadMetadataDetail : newLoadMetadataDetails) {
            if (currentLoadMetadataDetails[i].getLoadName().equals(
                newLoadMetadataDetail.getLoadName())) {
              currentLoadMetadataDetails[i] = newLoadMetadataDetail;
              found = true;
              break;
            }
          }
          updatedLoadMetadataDetails.add(currentLoadMetadataDetails[i]);
        }

        // check if newLoadMetadataDetail has segments which are not in  currentLoadMetaDetails
        // and add them to the updatedLoadMetadataDetails
        boolean foundNext = false;
        for (int i = 0; i < newLoadMetadataDetails.size(); i++) {
          foundNext = false;
          for (int j = 0; j < currentLoadMetadataDetails.length; j++) {
            if (newLoadMetadataDetails.get(i).getLoadName().equals(
                currentLoadMetadataDetails[j].getLoadName())) {
              foundNext = true;
              break;
            }
            if (j == currentLoadMetadataDetails.length - 1 && !foundNext) {
              // if not found in the list then add it
              updatedLoadMetadataDetails.add(newLoadMetadataDetails.get(i));
              found = true;
            }
          }
        }

        // when data load is done for first time, add all the details
        if (currentLoadMetadataDetails.length == 0 || !found) {
          updatedLoadMetadataDetails.addAll(newLoadMetadataDetails);
        }

        List<String> indexTables = CarbonIndexUtil.getSecondaryIndexes(carbonTable);
        if (!indexTables.isEmpty()) {
          List<LoadMetadataDetails> newSegmentDetailsListForIndexTable = new ArrayList<>(
              validSegments.size());
          for (String segmentId : validSegments) {
            LoadMetadataDetails newSegmentDetailsObject = new LoadMetadataDetails();
            newSegmentDetailsObject.setLoadName(segmentId);
            newSegmentDetailsListForIndexTable.add(newSegmentDetailsObject);
          }
          for (CarbonTable indexTable : indexCarbonTables) {
            List<LoadMetadataDetails> indexTableDetailsList =
                CarbonIndexUtil.getTableStatusDetailsForIndexTable(updatedLoadMetadataDetails,
                    indexTable, newSegmentDetailsListForIndexTable);

            SegmentStatusManager.writeLoadDetailsIntoFile(
                CarbonTablePath.getTableStatusFilePath(indexTable.getTablePath()),
                indexTableDetailsList.toArray(new LoadMetadataDetails[0]));
          }
        } else if (carbonTable.isIndexTable()) {
          SegmentStatusManager.writeLoadDetailsIntoFile(
              metaDataFilepath + CarbonCommonConstants.FILE_SEPARATOR
                  + CarbonTablePath.TABLE_STATUS_FILE,
              updatedLoadMetadataDetails.toArray(new LoadMetadataDetails[0]));
        }
        status = true;
      } else {
        LOGGER.error(
            "Not able to acquire the lock for Table status update for table " + databaseName + "."
                + tableName);
        throw new RuntimeException(
            "Not able to acquire the lock for Table status updation for table " + databaseName + "."
                + tableName);
      }
    } catch (IOException e) {
      LOGGER.error(
          "Not able to acquire the lock for Table status update for table " + databaseName + "."
              + tableName);
    } finally {
      if (carbonLock.unlock()) {
        LOGGER.info("Table unlocked successfully after table status update" + databaseName + "."
            + tableName);
      } else {
        LOGGER.error("Unable to unlock Table lock for table" + databaseName + "." + tableName
            + " during table status update");
      }
    }
    return status;
  }

  /**
   * This method read the details of SI table and check whether new metadata details are already
   * compacted, if it is, then already compaction for SI is completed and updating with new segment
   * status is useless, this can happen in case of updating the status of index while loading
   * segments for failed segments, so do not update anything, just exit gracefully
   */
  private static boolean isSegmentsAlreadyCompactedForNewMetaDataDetails(
      List<CarbonTable> indexTables, String indexTableName,
      List<LoadMetadataDetails> newLoadMetadataDetails) {
    CarbonTable currentIndexTable = null;
    for (CarbonTable indexTable : indexTables) {
      if (indexTable.getTableName().equalsIgnoreCase(indexTableName)) {
        currentIndexTable = indexTable;
        break;
      }
    }
    boolean isIndexTableSegmentsCompacted = false;
    if (null != currentIndexTable) {
      LoadMetadataDetails[] existingLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata(
          currentIndexTable.getMetadataPath());
      for (LoadMetadataDetails existingLoadMetaDataDetail : existingLoadMetaDataDetails) {
        for (LoadMetadataDetails newLoadMetadataDetail : newLoadMetadataDetails) {
          if (existingLoadMetaDataDetail.getLoadName().equalsIgnoreCase(
              newLoadMetadataDetail.getLoadName())
              && existingLoadMetaDataDetail.getSegmentStatus() == SegmentStatus.COMPACTED) {
            isIndexTableSegmentsCompacted = true;
            break;
          }
        }
        if (isIndexTableSegmentsCompacted) {
          break;
        }
      }
      return isIndexTableSegmentsCompacted;
    } else {
      return false;
    }
  }

  /**
   * method to update table status in case of IUD Update Delta Compaction.
   */
  public static boolean updateLoadMetadataWithMergeStatus(CarbonTable indexCarbonTable,
      String[] loadsToMerge, String mergedLoadNumber, Map<String, String> segmentToLoadStartTimeMap,
      long mergeLoadStartTime, SegmentStatus segmentStatus, long newLoadStartTime,
      List<String> rebuiltSegments) throws IOException {
    boolean tableStatusUpdateStatus = false;
    List<String> loadMergeList = new ArrayList<>(Arrays.asList(loadsToMerge));
    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(
        indexCarbonTable.getAbsoluteTableIdentifier());

    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();

    try {
      if (carbonLock.lockWithRetries()) {
        LOGGER.info("Acquired lock for the table " + indexCarbonTable.getDatabaseName() + "."
            + indexCarbonTable.getTableName() + " for table status update ");
        LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(
            indexCarbonTable.getMetadataPath());

        long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime();
        for (LoadMetadataDetails loadDetail : loadDetails) {
          // check if this segment is merged.
          if (loadMergeList.contains(loadDetail.getLoadName()) || loadMergeList.contains(
              loadDetail.getMergedLoadName())) {
            // if the compacted load is deleted after the start of the compaction process,
            // then need to discard the compaction process and treat it as failed compaction.
            if (loadDetail.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) {
              LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName()
                  + " is deleted after the compaction is started.");
              return false;
            }
            loadDetail.setSegmentStatus(SegmentStatus.COMPACTED);
            loadDetail.setModificationOrDeletionTimestamp(modificationOrDeletionTimeStamp);
            loadDetail.setMergedLoadName(mergedLoadNumber);
          }
        }

        // create entry for merged one.
        LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
        loadMetadataDetails.setSegmentStatus(segmentStatus);
        long loadEndDate = CarbonUpdateUtil.readCurrentTime();
        loadMetadataDetails.setLoadEndTime(loadEndDate);
        loadMetadataDetails.setLoadName(mergedLoadNumber);
        loadMetadataDetails.setSegmentFile(SegmentFileStore.genSegmentFileName(mergedLoadNumber,
            String.valueOf(segmentToLoadStartTimeMap.get(mergedLoadNumber)))
            + CarbonTablePath.SEGMENT_EXT);
        CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(loadMetadataDetails, mergedLoadNumber,
            indexCarbonTable);
        if (rebuiltSegments.contains(loadMetadataDetails.getLoadName())) {
          loadMetadataDetails.setLoadStartTime(newLoadStartTime);
        } else {
          loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
        }

        // put the merged folder entry
        for (int i = 0; i < loadDetails.length; i++) {
          if (loadDetails[i].getLoadName().equals(loadMetadataDetails.getLoadName())) {
            loadDetails[i] = loadMetadataDetails;
          }
        }

        // if this is a major compaction then set the segment as major compaction.
        List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));

        SegmentStatusManager.writeLoadDetailsIntoFile(
            CarbonTablePath.getTableStatusFilePath(indexCarbonTable.getTablePath()),
            updatedDetailsList.toArray(new LoadMetadataDetails[0]));
        tableStatusUpdateStatus = true;
      } else {
        LOGGER.error(
            "Could not able to obtain lock for table" + indexCarbonTable.getDatabaseName() + "."
                + indexCarbonTable.getTableName() + "for table status update");
      }
    } finally {
      if (carbonLock.unlock()) {
        LOGGER.info("Table unlocked successfully after table status update" + indexCarbonTable
            .getDatabaseName() + "." + indexCarbonTable.getTableName());
      } else {
        LOGGER.error(
            "Unable to unlock Table lock for table" + indexCarbonTable.getDatabaseName() + "."
                + indexCarbonTable.getTableName() + " during table status update");
      }
    }
    return tableStatusUpdateStatus;
  }

  public static boolean checkMainTableSegEqualToSISeg(
      LoadMetadataDetails[] mainTableLoadMetadataDetails,
      LoadMetadataDetails[] siTableLoadMetadataDetails) throws ErrorMessage {
    return checkMainTableSegEqualToSISeg(mainTableLoadMetadataDetails, siTableLoadMetadataDetails,
        false);
  }

  /**
   * Method to check if main table and SI have same number of valid segments or not
   */
  public static boolean checkMainTableSegEqualToSISeg(
      LoadMetadataDetails[] mainTableLoadMetadataDetails,
      LoadMetadataDetails[] siTableLoadMetadataDetails, boolean isRegisterIndex)
      throws ErrorMessage {
    List<String> mainTableSegmentsList = getListOfValidSlices(mainTableLoadMetadataDetails);
    List<String> indexList = getListOfValidSlices(siTableLoadMetadataDetails);
    Collections.sort(mainTableSegmentsList);
    Collections.sort(indexList);
    // In the case when number of SI segments are more than the main table segments do nothing
    // and proceed to process the segments. Return False in case if main table segments are more
    // than SI Segments
    if (indexList.size() < mainTableSegmentsList.size()) {
      return false;
    } else if ((indexList.size() > mainTableSegmentsList.size()) && isRegisterIndex) {
      throw new ErrorMessage("Cannot register index, as the number of Secondary index table "
          + "segments are more than the main table segments. Try Drop and recreate SI.");
    }
    // There can be cases when the number of segments in the main table are less than the index
    // table. In this case mapping all the segments in main table to SI table.
    // Return False if a segment in main table is not in index table
    HashSet<String> indexTableSet = new HashSet<String>();
    for (int i = 0; i < indexList.size(); i++) {
      indexTableSet.add(indexList.get(i));
    }
    for (int i = 0; i < mainTableSegmentsList.size(); i++) {
      if (!indexTableSet.contains(mainTableSegmentsList.get(i))) {
        return false;
      }
    }
    return true;
  }

  /**
   * Method to check if main table has in progress load and same segment not present in SI
   */
  public static boolean checkInProgLoadInMainTableAndSI(CarbonTable carbonTable,
      LoadMetadataDetails[] mainTableLoadMetadataDetails,
      LoadMetadataDetails[] siTableLoadMetadataDetails) {
    List<String> allSiSlices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
    for (LoadMetadataDetails oneLoad : siTableLoadMetadataDetails) {
      allSiSlices.add(oneLoad.getLoadName());
    }

    if (mainTableLoadMetadataDetails.length != 0) {
      for (LoadMetadataDetails loadDetail : mainTableLoadMetadataDetails) {
        // if load in progress and check if same load is present in SI.
        if (SegmentStatusManager.isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(),
            loadDetail.getLoadName())) {
          if (!allSiSlices.contains(loadDetail.getLoadName())) {
            return false;
          }
        }
      }
    }
    return true;
  }
}
